{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 201,
   "metadata": {},
   "outputs": [],
   "source": [
    "#create spark session\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark=SparkSession.builder.appName('nlp').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 202,
   "metadata": {},
   "outputs": [],
   "source": [
    "df=spark.createDataFrame([(1,'I really liked this movie'),\n",
    "                         (2,'I would recommend this movie to my friends'),\n",
    "                         (3,'movie was alright but acting was horrible'),\n",
    "                         (4,'I am never watching that movie ever again')],\n",
    "                        ['user_id','review'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 203,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------------------------------+\n",
      "|user_id|review                                    |\n",
      "+-------+------------------------------------------+\n",
      "|1      |I really liked this movie                 |\n",
      "|2      |I would recommend this movie to my friends|\n",
      "|3      |movie was alright but acting was horrible |\n",
      "|4      |I am never watching that movie ever again |\n",
      "+-------+------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(5,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Tokenization"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 204,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import Tokenizer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 205,
   "metadata": {},
   "outputs": [],
   "source": [
    "tokenization=Tokenizer(inputCol='review',outputCol='tokens')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 206,
   "metadata": {},
   "outputs": [],
   "source": [
    "tokenized_df=tokenization.transform(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 207,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------------------------------+---------------------------------------------------+\n",
      "|user_id|review                                    |tokens                                             |\n",
      "+-------+------------------------------------------+---------------------------------------------------+\n",
      "|1      |I really liked this movie                 |[i, really, liked, this, movie]                    |\n",
      "|2      |I would recommend this movie to my friends|[i, would, recommend, this, movie, to, my, friends]|\n",
      "|3      |movie was alright but acting was horrible |[movie, was, alright, but, acting, was, horrible]  |\n",
      "|4      |I am never watching that movie ever again |[i, am, never, watching, that, movie, ever, again] |\n",
      "+-------+------------------------------------------+---------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "tokenized_df.show(4,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# stopwords removal "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 208,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import StopWordsRemover"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 209,
   "metadata": {},
   "outputs": [],
   "source": [
    "stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 210,
   "metadata": {},
   "outputs": [],
   "source": [
    "refined_df=stopword_removal.transform(tokenized_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 211,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---------------------------------------------------+----------------------------------+\n",
      "|user_id|tokens                                             |refined_tokens                    |\n",
      "+-------+---------------------------------------------------+----------------------------------+\n",
      "|1      |[i, really, liked, this, movie]                    |[really, liked, movie]            |\n",
      "|2      |[i, would, recommend, this, movie, to, my, friends]|[recommend, movie, friends]       |\n",
      "|3      |[movie, was, alright, but, acting, was, horrible]  |[movie, alright, acting, horrible]|\n",
      "|4      |[i, am, never, watching, that, movie, ever, again] |[never, watching, movie, ever]    |\n",
      "+-------+---------------------------------------------------+----------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "refined_df.select(['user_id','tokens','refined_tokens']).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Count Vectorizer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 212,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import CountVectorizer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 213,
   "metadata": {},
   "outputs": [],
   "source": [
    "count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 214,
   "metadata": {},
   "outputs": [],
   "source": [
    "cv_df=count_vec.fit(refined_df).transform(refined_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 215,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------------------------------+--------------------------------+\n",
      "|user_id|refined_tokens                    |features                        |\n",
      "+-------+----------------------------------+--------------------------------+\n",
      "|1      |[really, liked, movie]            |(11,[0,4,9],[1.0,1.0,1.0])      |\n",
      "|2      |[recommend, movie, friends]       |(11,[0,6,10],[1.0,1.0,1.0])     |\n",
      "|3      |[movie, alright, acting, horrible]|(11,[0,2,3,5],[1.0,1.0,1.0,1.0])|\n",
      "|4      |[never, watching, movie, ever]    |(11,[0,1,7,8],[1.0,1.0,1.0,1.0])|\n",
      "+-------+----------------------------------+--------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "cv_df.select(['user_id','refined_tokens','features']).show(4,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 218,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['movie',\n",
       " 'horrible',\n",
       " 'really',\n",
       " 'alright',\n",
       " 'liked',\n",
       " 'friends',\n",
       " 'recommend',\n",
       " 'never',\n",
       " 'ever',\n",
       " 'acting',\n",
       " 'watching']"
      ]
     },
     "execution_count": 218,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "count_vec.fit(refined_df).vocabulary"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Tf-idf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 220,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import HashingTF,IDF"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 221,
   "metadata": {},
   "outputs": [],
   "source": [
    "hashing_vec=HashingTF(inputCol='refined_tokens',outputCol='tf_features')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 222,
   "metadata": {},
   "outputs": [],
   "source": [
    "hashing_df=hashing_vec.transform(refined_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 225,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------------------------------+-------------------------------------------------------+\n",
      "|user_id|refined_tokens                    |tf_features                                            |\n",
      "+-------+----------------------------------+-------------------------------------------------------+\n",
      "|1      |[really, liked, movie]            |(262144,[14,32675,155321],[1.0,1.0,1.0])               |\n",
      "|2      |[recommend, movie, friends]       |(262144,[129613,155321,222394],[1.0,1.0,1.0])          |\n",
      "|3      |[movie, alright, acting, horrible]|(262144,[80824,155321,236263,240286],[1.0,1.0,1.0,1.0])|\n",
      "|4      |[never, watching, movie, ever]    |(262144,[63139,155321,203802,245806],[1.0,1.0,1.0,1.0])|\n",
      "+-------+----------------------------------+-------------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "hashing_df.select(['user_id','refined_tokens','tf_features']).show(4,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 227,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf_idf_vec=IDF(inputCol='tf_features',outputCol='tf_idf_features')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 228,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf_idf_df=tf_idf_vec.fit(hashing_df).transform(hashing_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 230,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------------------------------------------------------------------------------------------------+\n",
      "|user_id|tf_idf_features                                                                                     |\n",
      "+-------+----------------------------------------------------------------------------------------------------+\n",
      "|1      |(262144,[14,32675,155321],[0.9162907318741551,0.9162907318741551,0.0])                              |\n",
      "|2      |(262144,[129613,155321,222394],[0.9162907318741551,0.0,0.9162907318741551])                         |\n",
      "|3      |(262144,[80824,155321,236263,240286],[0.9162907318741551,0.0,0.9162907318741551,0.9162907318741551])|\n",
      "|4      |(262144,[63139,155321,203802,245806],[0.9162907318741551,0.0,0.9162907318741551,0.9162907318741551])|\n",
      "+-------+----------------------------------------------------------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "tf_idf_df.select(['user_id','tf_idf_features']).show(4,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Classification "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 241,
   "metadata": {},
   "outputs": [],
   "source": [
    "text_df=spark.read.csv('Movie_reviews.csv',inferSchema=True,header=True,sep=',')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 242,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Review: string (nullable = true)\n",
      " |-- Sentiment: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "text_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 243,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "7087"
      ]
     },
     "execution_count": 243,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "text_df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 244,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import rand "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 245,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------------------------------------------------------------+---------+\n",
      "|Review                                                                  |Sentiment|\n",
      "+------------------------------------------------------------------------+---------+\n",
      "|My dad's being stupid about brokeback mountain...                       |0        |\n",
      "|Ok brokeback mountain is such a horrible movie.                         |0        |\n",
      "|I love Brokeback Mountain.                                              |1        |\n",
      "|He's like,'YEAH I GOT ACNE AND I LOVE BROKEBACK MOUNTAIN '..            |1        |\n",
      "|Harry Potter and the Sorcerer's Stone is great but I had forgotten what |1        |\n",
      "|\"Anyway, thats why I love \"\" Brokeback Mountain.\"                       |1        |\n",
      "|Which is why i said silent hill turned into reality coz i was hella like|1        |\n",
      "|Apparently the Da Vinci code sucks.                                     |0        |\n",
      "|I am going to start reading the Harry Potter series again because that i|1        |\n",
      "|So as felicia's mom is cleaning the table, felicia grabs my keys and we |1        |\n",
      "+------------------------------------------------------------------------+---------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "text_df.orderBy(rand()).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 246,
   "metadata": {},
   "outputs": [],
   "source": [
    "text_df=text_df.filter(((text_df.Sentiment =='1') | (text_df.Sentiment =='0')))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 247,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "6990"
      ]
     },
     "execution_count": 247,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "text_df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 248,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+-----+\n",
      "|Sentiment|count|\n",
      "+---------+-----+\n",
      "|        0| 3081|\n",
      "|        1| 3909|\n",
      "+---------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "text_df.groupBy('Sentiment').count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 249,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Review: string (nullable = true)\n",
      " |-- Sentiment: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "text_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 250,
   "metadata": {},
   "outputs": [],
   "source": [
    "text_df = text_df.withColumn(\"Label\", text_df.Sentiment.cast('float')).drop('Sentiment')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 251,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------------------------------------------------------------+-----+\n",
      "|Review                                                                  |Label|\n",
      "+------------------------------------------------------------------------+-----+\n",
      "|I hate Harry Potter.                                                    |0.0  |\n",
      "|I am the only person in the world who thought Brokeback Mountain sucked.|0.0  |\n",
      "|Not because I hate Harry Potter, but because I am the type of person tha|0.0  |\n",
      "|Which is why i said silent hill turned into reality coz i was hella like|1.0  |\n",
      "|The Da Vinci Code sucked big time.                                      |0.0  |\n",
      "|The Da Vinci Code sucked big time.                                      |0.0  |\n",
      "|Ok brokeback mountain is such a horrible movie.                         |0.0  |\n",
      "|A / N: This is a gift for sivullinen who requested: ” I'd love some NC  |1.0  |\n",
      "|, she helped me bobbypin my insanely cool hat to my head, and she laughe|0.0  |\n",
      "|I love the Da Vinci Code.                                               |1.0  |\n",
      "+------------------------------------------------------------------------+-----+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "text_df.orderBy(rand()).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 252,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+-----+\n",
      "|label|count|\n",
      "+-----+-----+\n",
      "|  1.0| 3909|\n",
      "|  0.0| 3081|\n",
      "+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "text_df.groupBy('label').count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 253,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Add length to the dataframe\n",
    "from pyspark.sql.functions import length"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 254,
   "metadata": {},
   "outputs": [],
   "source": [
    "text_df=text_df.withColumn('length',length(text_df['Review']))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 255,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------------------------------------------------------------+-----+------+\n",
      "|Review                                                                  |Label|length|\n",
      "+------------------------------------------------------------------------+-----+------+\n",
      "|I have to say that I loved Brokeback Mountain.                          |1.0  |46    |\n",
      "|The Da Vinci Code is awesome!!                                          |1.0  |30    |\n",
      "|Oh, and Brokeback Mountain was a terrible movie.                        |0.0  |48    |\n",
      "|man i loved brokeback mountain!                                         |1.0  |31    |\n",
      "|Even though Brokeback Mountain is one of the most depressing movies, eve|0.0  |72    |\n",
      "|da vinci code sucks...                                                  |0.0  |22    |\n",
      "|Combining the opinion / review from Gary and Gin Zen, The Da Vinci Code |0.0  |71    |\n",
      "|da vinci code sucks...                                                  |0.0  |22    |\n",
      "|Finally feel up to making the long ass drive out to the Haunt tonight...|1.0  |72    |\n",
      "|the last stand and Mission Impossible 3 both were awesome movies.       |1.0  |65    |\n",
      "+------------------------------------------------------------------------+-----+------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "text_df.orderBy(rand()).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 256,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+-----------------+\n",
      "|Label|      avg(Length)|\n",
      "+-----+-----------------+\n",
      "|  1.0|47.61882834484523|\n",
      "|  0.0|50.95845504706264|\n",
      "+-----+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "text_df.groupBy('Label').agg({'Length':'mean'}).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 108,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Data Cleaning"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 257,
   "metadata": {},
   "outputs": [],
   "source": [
    "tokenization=Tokenizer(inputCol='Review',outputCol='tokens')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 258,
   "metadata": {},
   "outputs": [],
   "source": [
    "tokenized_df=tokenization.transform(text_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 259,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+------+--------------------+\n",
      "|              Review|Label|length|              tokens|\n",
      "+--------------------+-----+------+--------------------+\n",
      "|The Da Vinci Code...|  1.0|    39|[the, da, vinci, ...|\n",
      "|this was the firs...|  1.0|    72|[this, was, the, ...|\n",
      "|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|\n",
      "|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|\n",
      "|I liked the Da Vi...|  1.0|    72|[i, liked, the, d...|\n",
      "|that's not even a...|  1.0|    72|[that's, not, eve...|\n",
      "|I loved the Da Vi...|  1.0|    72|[i, loved, the, d...|\n",
      "|i thought da vinc...|  1.0|    57|[i, thought, da, ...|\n",
      "|The Da Vinci Code...|  1.0|    45|[the, da, vinci, ...|\n",
      "|I thought the Da ...|  1.0|    51|[i, thought, the,...|\n",
      "|The Da Vinci Code...|  1.0|    68|[the, da, vinci, ...|\n",
      "|The Da Vinci Code...|  1.0|    62|[the, da, vinci, ...|\n",
      "|then I turn on th...|  1.0|    66|[then, i, turn, o...|\n",
      "|The Da Vinci Code...|  1.0|    34|[the, da, vinci, ...|\n",
      "|i love da vinci c...|  1.0|    24|[i, love, da, vin...|\n",
      "|i loved da vinci ...|  1.0|    23|[i, loved, da, vi...|\n",
      "|TO NIGHT:: THE DA...|  1.0|    52|[to, night::, the...|\n",
      "|THE DA VINCI CODE...|  1.0|    40|[the, da, vinci, ...|\n",
      "|Thing is, I enjoy...|  1.0|    38|[thing, is,, i, e...|\n",
      "|very da vinci cod...|  1.0|    38|[very, da, vinci,...|\n",
      "+--------------------+-----+------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "tokenized_df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 260,
   "metadata": {},
   "outputs": [],
   "source": [
    "stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 261,
   "metadata": {},
   "outputs": [],
   "source": [
    "refined_text_df=stopword_removal.transform(tokenized_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 262,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+------+--------------------+--------------------+\n",
      "|              Review|Label|length|              tokens|      refined_tokens|\n",
      "+--------------------+-----+------+--------------------+--------------------+\n",
      "|The Da Vinci Code...|  1.0|    39|[the, da, vinci, ...|[da, vinci, code,...|\n",
      "|this was the firs...|  1.0|    72|[this, was, the, ...|[first, clive, cu...|\n",
      "|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|[liked, da, vinci...|\n",
      "|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|[liked, da, vinci...|\n",
      "|I liked the Da Vi...|  1.0|    72|[i, liked, the, d...|[liked, da, vinci...|\n",
      "|that's not even a...|  1.0|    72|[that's, not, eve...|[even, exaggerati...|\n",
      "|I loved the Da Vi...|  1.0|    72|[i, loved, the, d...|[loved, da, vinci...|\n",
      "|i thought da vinc...|  1.0|    57|[i, thought, da, ...|[thought, da, vin...|\n",
      "|The Da Vinci Code...|  1.0|    45|[the, da, vinci, ...|[da, vinci, code,...|\n",
      "|I thought the Da ...|  1.0|    51|[i, thought, the,...|[thought, da, vin...|\n",
      "|The Da Vinci Code...|  1.0|    68|[the, da, vinci, ...|[da, vinci, code,...|\n",
      "|The Da Vinci Code...|  1.0|    62|[the, da, vinci, ...|[da, vinci, code,...|\n",
      "|then I turn on th...|  1.0|    66|[then, i, turn, o...|[turn, light, rad...|\n",
      "|The Da Vinci Code...|  1.0|    34|[the, da, vinci, ...|[da, vinci, code,...|\n",
      "|i love da vinci c...|  1.0|    24|[i, love, da, vin...|[love, da, vinci,...|\n",
      "|i loved da vinci ...|  1.0|    23|[i, loved, da, vi...|[loved, da, vinci...|\n",
      "|TO NIGHT:: THE DA...|  1.0|    52|[to, night::, the...|[night::, da, vin...|\n",
      "|THE DA VINCI CODE...|  1.0|    40|[the, da, vinci, ...|[da, vinci, code,...|\n",
      "|Thing is, I enjoy...|  1.0|    38|[thing, is,, i, e...|[thing, is,, enjo...|\n",
      "|very da vinci cod...|  1.0|    38|[very, da, vinci,...|[da, vinci, code,...|\n",
      "+--------------------+-----+------+--------------------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "refined_text_df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 263,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "from pyspark.sql.types import IntegerType\n",
    "from pyspark.sql.functions import *"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 264,
   "metadata": {},
   "outputs": [],
   "source": [
    "len_udf = udf(lambda s: len(s), IntegerType())\n",
    "\n",
    "refined_text_df = refined_text_df.withColumn(\"token_count\", len_udf(col('refined_tokens')))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 268,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+------+--------------------+--------------------+-----------+\n",
      "|              Review|Label|length|              tokens|      refined_tokens|token_count|\n",
      "+--------------------+-----+------+--------------------+--------------------+-----------+\n",
      "|da vinci code was...|  1.0|    37|[da, vinci, code,...|[da, vinci, code,...|          5|\n",
      "|Not because I hat...|  0.0|    72|[not, because, i,...|[hate, harry, pot...|          6|\n",
      "|I love Harry Potter.|  1.0|    20|[i, love, harry, ...|[love, harry, pot...|          3|\n",
      "|and I love Da Vin...|  1.0|    71|[and, i, love, da...|[love, da, vinci,...|          7|\n",
      "|Da Vinci Code = U...|  0.0|    72|[da, vinci, code,...|[da, vinci, code,...|         15|\n",
      "|Brokeback Mountai...|  1.0|    34|[brokeback, mount...|[brokeback, mount...|          3|\n",
      "|I think I hate Ha...|  0.0|    72|[i, think, i, hat...|[think, hate, har...|          9|\n",
      "|Harry Potter is b...|  1.0|    26|[harry, potter, i...|[harry, potter, b...|          3|\n",
      "|The Da Vinci Code...|  1.0|    30|[the, da, vinci, ...|[da, vinci, code,...|          4|\n",
      "|Combining the opi...|  0.0|    71|[combining, the, ...|[combining, opini...|         10|\n",
      "+--------------------+-----+------+--------------------+--------------------+-----------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "refined_text_df.orderBy(rand()).show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 163,
   "metadata": {},
   "outputs": [],
   "source": [
    "count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 164,
   "metadata": {},
   "outputs": [],
   "source": [
    "cv_text_df=count_vec.fit(refined_text_df).transform(refined_text_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 270,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----------+--------------------+-----+\n",
      "|      refined_tokens|token_count|            features|Label|\n",
      "+--------------------+-----------+--------------------+-----+\n",
      "|[da, vinci, code,...|          5|(2302,[0,1,4,43,2...|  1.0|\n",
      "|[first, clive, cu...|          9|(2302,[11,51,229,...|  1.0|\n",
      "|[liked, da, vinci...|          5|(2302,[0,1,4,53,3...|  1.0|\n",
      "|[liked, da, vinci...|          5|(2302,[0,1,4,53,3...|  1.0|\n",
      "|[liked, da, vinci...|          8|(2302,[0,1,4,53,6...|  1.0|\n",
      "|[even, exaggerati...|          6|(2302,[46,229,271...|  1.0|\n",
      "|[loved, da, vinci...|          8|(2302,[0,1,22,30,...|  1.0|\n",
      "|[thought, da, vin...|          7|(2302,[0,1,4,228,...|  1.0|\n",
      "|[da, vinci, code,...|          6|(2302,[0,1,4,33,2...|  1.0|\n",
      "|[thought, da, vin...|          7|(2302,[0,1,4,223,...|  1.0|\n",
      "+--------------------+-----------+--------------------+-----+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "cv_text_df.select(['refined_tokens','token_count','features','Label']).show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 271,
   "metadata": {},
   "outputs": [],
   "source": [
    "#select data for building model\n",
    "model_text_df=cv_text_df.select(['features','token_count','Label'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 272,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import VectorAssembler"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 273,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_assembler = VectorAssembler(inputCols=['features','token_count'],outputCol='features_vec')\n",
    "model_text_df = df_assembler.transform(model_text_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 274,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- features: vector (nullable = true)\n",
      " |-- token_count: integer (nullable = true)\n",
      " |-- Label: float (nullable = true)\n",
      " |-- features_vec: vector (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "model_text_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 276,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import LogisticRegression"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 277,
   "metadata": {},
   "outputs": [],
   "source": [
    "#split the data \n",
    "training_df,test_df=model_text_df.randomSplit([0.75,0.25])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 278,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+-----+\n",
      "|Label|count|\n",
      "+-----+-----+\n",
      "|  1.0| 2979|\n",
      "|  0.0| 2335|\n",
      "+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "training_df.groupBy('Label').count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 279,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+-----+\n",
      "|Label|count|\n",
      "+-----+-----+\n",
      "|  1.0|  930|\n",
      "|  0.0|  746|\n",
      "+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "test_df.groupBy('Label').count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 294,
   "metadata": {},
   "outputs": [],
   "source": [
    "log_reg=LogisticRegression(featuresCol='features_vec',labelCol='Label').fit(training_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 295,
   "metadata": {},
   "outputs": [],
   "source": [
    "results=log_reg.evaluate(test_df).predictions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 296,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+\n",
      "|            features|token_count|Label|        features_vec|       rawPrediction|         probability|prediction|\n",
      "+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+\n",
      "|(2302,[0,1,4,5,64...|          6|  1.0|(2303,[0,1,4,5,64...|[-17.272830422692...|[3.15141100218827...|       1.0|\n",
      "|(2302,[0,1,4,5,89...|          9|  1.0|(2303,[0,1,4,5,89...|[-5.3071943841355...|[0.00493137238287...|       1.0|\n",
      "|(2302,[0,1,4,5,30...|          5|  1.0|(2303,[0,1,4,5,30...|[-20.050569575912...|[1.95951356060452...|       1.0|\n",
      "|(2302,[0,1,4,5,44...|          5|  1.0|(2303,[0,1,4,5,44...|[-20.154922616911...|[1.76533984442990...|       1.0|\n",
      "|(2302,[0,1,4,5,82...|          6|  1.0|(2303,[0,1,4,5,82...|[-14.417812465440...|[5.47549475575723...|       1.0|\n",
      "|(2302,[0,1,4,11,1...|          6|  0.0|(2303,[0,1,4,11,1...|[19.1666519710833...|[0.99999999525726...|       0.0|\n",
      "|(2302,[0,1,4,11,4...|          7|  1.0|(2303,[0,1,4,11,4...|[-19.931332074913...|[2.20766138812030...|       1.0|\n",
      "|(2302,[0,1,4,12,1...|          8|  1.0|(2303,[0,1,4,12,1...|[0.94242754202479...|[0.71958974868604...|       0.0|\n",
      "|(2302,[0,1,4,12,1...|          5|  1.0|(2303,[0,1,4,12,1...|[-16.855454538304...|[4.78375669750487...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          8|  1.0|(2303,[0,1,4,12,3...|[-25.986485886596...|[5.17860248497715...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-20.047155070146...|[1.96621576679026...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-20.047155070146...|[1.96621576679026...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-20.047155070146...|[1.96621576679026...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-20.047155070146...|[1.96621576679026...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-20.047155070146...|[1.96621576679026...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-20.047155070146...|[1.96621576679026...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-20.047155070146...|[1.96621576679026...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-20.047155070146...|[1.96621576679026...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-20.047155070146...|[1.96621576679026...|       1.0|\n",
      "|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-20.047155070146...|[1.96621576679026...|       1.0|\n",
      "+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "results.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 297,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 298,
   "metadata": {},
   "outputs": [],
   "source": [
    "#confusion matrix\n",
    "true_postives = results[(results.Label == 1) & (results.prediction == 1)].count()\n",
    "true_negatives = results[(results.Label == 0) & (results.prediction == 0)].count()\n",
    "false_positives = results[(results.Label == 0) & (results.prediction == 1)].count()\n",
    "false_negatives = results[(results.Label == 1) & (results.prediction == 0)].count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 299,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0.986021505376344\n"
     ]
    }
   ],
   "source": [
    "recall = float(true_postives)/(true_postives + false_negatives)\n",
    "print(recall)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 300,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0.9572025052192067\n"
     ]
    }
   ],
   "source": [
    "precision = float(true_postives) / (true_postives + false_positives)\n",
    "print(precision)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 301,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0.9677804295942721\n"
     ]
    }
   ],
   "source": [
    "accuracy=float((true_postives+true_negatives) /(results.count()))\n",
    "print(accuracy)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
